package com.dahuan.source;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Random;

public class MySensor implements SourceFunction<SensorReading>{
    /**
     *
     *  1.定义一个随机数
     *  2.定义一个hashmap 增加id , 和temperature ( id为String类型, temperature为double类型 )
     *  3.建立一个for循环让id不断的增长
     *  4.将id和temperature 连接起来 temperature用正态分布(高斯分布)
     *  5.设置while循环无尽输入
     *  6.打印结果输出 用重写方法中的对象 调用collect方法输出
     *
     */

        //定义一个 任务确保是否可以运行
        Boolean isRunning = true;

        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {

            //定义一个随机数
            Random random = new Random();

            HashMap<String, Double> sensorTemp = new HashMap<>();
            for (int i = 1; i < 10; i++) {
                sensorTemp.put( "sensor_" + i, 60 + random.nextGaussian() * 20 );
                //System.out.println(sensorTemp);//TODO 结果{sensor_1=50.46646614857943, sensor_2=75.92749093814079}
            }
            while (isRunning){
                for (String sensorID : sensorTemp.keySet()) { //TODO 返回此映射中包含的键的 Set 视图。
                    //TODO System.out.println(sensorID); ID
                    //TODO 返回指定键所映射的值；如果此映射不包含该键的映射关系，则返回 null。
                    Double newTemp = sensorTemp.get( sensorID ); //TODO + random.nextGaussian();  随机数的高斯分布(正态分布)
                    System.out.println(newTemp);
                    //TODO System.out.println(newTemp); 传感器
                    //TODO sensorTemp.put( sensorID, newTemp ); 这个语句不管有没有也不影响结果
                    ctx.collect( new SensorReading(sensorID,System.currentTimeMillis(),newTemp) );

                }
                Thread.sleep( 10000 );
            }


        }

        @Override
        public void cancel() {
            //取消任务
            isRunning = false;
        }
    }

